/*
 * Copyright (c) 2015 The Jupiter Project
 *
 * Licensed under the Apache License, version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.inyourcode.core.transport.session;

import com.inyourcode.core.GlobalConstants;
import com.inyourcode.core.util.Signal;
import com.inyourcode.core.util.StackTraceUtil;
import com.inyourcode.core.transport.api.JConnection;
import com.inyourcode.core.transport.api.JOption;
import com.inyourcode.core.transport.api.UnresolvedAddress;
import com.inyourcode.core.transport.api.channel.JChannel;
import com.inyourcode.core.transport.api.channel.JChannelGroup;
import com.inyourcode.core.transport.api.exception.ConnectFailedException;
import com.inyourcode.core.transport.api.exception.IoSignals;
import com.inyourcode.core.transport.api.processor.ConsumerProcessor;
import com.inyourcode.core.transport.netty.JNettyConnection;
import com.inyourcode.core.transport.netty.NettyTcpConnector;
import com.inyourcode.core.transport.netty.TcpChannelProvider;
import com.inyourcode.core.transport.netty.channel.NettyChannel;
import com.inyourcode.core.transport.netty.handler.IdleStateChecker;
import com.inyourcode.core.transport.netty.handler.ProtocolDecoder;
import com.inyourcode.core.transport.netty.handler.ProtocolEncoder;
import com.inyourcode.core.transport.netty.handler.connector.ConnectionWatchdog;
import com.inyourcode.core.transport.netty.handler.connector.ConnectorIdleStateTrigger;
import com.inyourcode.core.transport.session.head.MessageHeadDecoder;
import com.inyourcode.core.transport.session.head.MessageHeadEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;

/**
 *
 * @author JackLei
 */
public class BasicTcpConnector extends NettyTcpConnector {

    // handlers
    private final ConnectorIdleStateTrigger idleStateTrigger = new ConnectorIdleStateTrigger();
    private final ProtocolEncoder encoder = new ProtocolEncoder();
    private SessionConnectorHandler handler;

    public BasicTcpConnector() {}

    public BasicTcpConnector(boolean nativeEt) {
        super(nativeEt);
    }

    public BasicTcpConnector(int nWorkers) {
        super(nWorkers);
    }

    public BasicTcpConnector(int nWorkers, boolean nativeEt) {
        super(nWorkers, nativeEt);
    }

    @Override
    protected void doInit() {
        // child options
        config().setOption(JOption.SO_REUSEADDR, true);
        config().setOption(JOption.CONNECT_TIMEOUT_MILLIS, (int) TimeUnit.SECONDS.toMillis(3));
        // channel factory
        if (isNativeEt()) {
            bootstrap().channelFactory(TcpChannelProvider.NATIVE_CONNECTOR);
        } else {
            bootstrap().channelFactory(TcpChannelProvider.NIO_CONNECTOR);
        }

        handler = new SessionConnectorHandler();
        handler.processor = (new BasicConsumerProcessor());
    }

    @Override
    public void withProcessor(ConsumerProcessor processor) {
        throw  new RuntimeException("This method is not supported");
    }

    @Override
    public JConnection connect(UnresolvedAddress address, boolean async) {
        setOptions();

        final Bootstrap boot = bootstrap();
        final SocketAddress socketAddress = InetSocketAddress.createUnresolved(address.getHost(), address.getPort());
        final JChannelGroup group = group(address);

        // 重连watchdog
        final ConnectionWatchdog watchdog = new ConnectionWatchdog(boot, timer, socketAddress, group) {

            @Override
            public ChannelHandler[] handlers() {
                return new ChannelHandler[] {
                        this,
                        new IdleStateChecker(timer, 0, GlobalConstants.WRITER_IDLE_TIME_SECONDS, 0),
                        idleStateTrigger,
                        new ProtocolDecoder(),
                        new MessageHeadDecoder(),
                        encoder,
                        new MessageHeadEncoder(),
                        handler
                };
            }
        };
        watchdog.start();

        ChannelFuture future;
        try {
            synchronized (bootstrapLock()) {
                boot.handler(new ChannelInitializer<Channel>() {

                    @Override
                    protected void initChannel(Channel ch) throws Exception {
                        ch.pipeline().addLast(watchdog.handlers());
                    }
                });

                future = boot.connect(socketAddress);
            }

            // 以下代码在synchronized同步块外面是安全的
            if (!async) {
                future.sync();
            }
        } catch (Throwable t) {
            throw new ConnectFailedException("connects to [" + address + "] fails", t);
        }

        return new JNettyConnection(address, future) {

            @Override
            public void setReconnect(boolean reconnect) {
                if (reconnect) {
                    watchdog.start();
                } else {
                    watchdog.stop();
                }
            }
        };
    }

    @ChannelHandler.Sharable
    static class SessionConnectorHandler extends ChannelInboundHandlerAdapter {

        private static final Logger logger = LoggerFactory.getLogger(SessionConnectorHandler.class);

        private ConsumerProcessor processor;

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            if (msg instanceof ResponseContext) {
                JChannel jChannel = NettyChannel.attachChannel(ctx.channel());
                try {
                    processor.handleResponse(jChannel, msg);
                } catch (Throwable t) {
                    logger.error("An exception has been caught {}, on {} #channelRead().", StackTraceUtil.stackTrace(t), jChannel);
                }
            } else {
                logger.warn("Unexpected message type received: {}.", msg.getClass());

                ReferenceCountUtil.release(msg);
            }
        }

        @Override
        public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
            Channel ch = ctx.channel();

            // 高水位线: ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK
            // 低水位线: ChannelOption.WRITE_BUFFER_LOW_WATER_MARK
            if (!ch.isWritable()) {
                // 当前channel的缓冲区(OutboundBuffer)大小超过了WRITE_BUFFER_HIGH_WATER_MARK
                logger.warn("{} is not writable, high water mask: {}, the number of flushed entries that are not written yet: {}.",
                        ch, ch.config().getWriteBufferHighWaterMark(), ch.unsafe().outboundBuffer().size());

                ch.config().setAutoRead(false);
            } else {
                // 曾经高于高水位线的OutboundBuffer现在已经低于WRITE_BUFFER_LOW_WATER_MARK了
                logger.warn("{} is writable(rehabilitate), low water mask: {}, the number of flushed entries that are not written yet: {}.",
                        ch, ch.config().getWriteBufferLowWaterMark(), ch.unsafe().outboundBuffer().size());

                ch.config().setAutoRead(true);
            }
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            JChannel jChannel = NettyChannel.attachChannel(ctx.channel());
            if (cause instanceof Signal) {
                IoSignals.handleSignal((Signal) cause, jChannel);
            } else {
                logger.error("An exception has been caught {}, on {}.", StackTraceUtil.stackTrace(cause), jChannel);
            }
        }

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            super.channelActive(ctx);
            processor.channelActive(ctx);
        }

        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            super.channelInactive(ctx);
            processor.channelInactive(ctx);
        }

        public ConsumerProcessor processor() {
            return processor;
        }

        public void processor(ConsumerProcessor processor) {
            this.processor = processor;
        }
    }

}
